#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import inspect

import pandas as pd

from apache_beam.dataframe import expressions


class DeferredFrame(object):

  _pandas_type_map = {}

  def __init__(self, expr):
    self._expr = expr

  @classmethod
  def _register_for(cls, pandas_type):
    def wrapper(deferred_type):
      cls._pandas_type_map[pandas_type] = deferred_type
      return deferred_type

    return wrapper

  @classmethod
  def wrap(cls, expr):
    return cls._pandas_type_map[type(expr.proxy())](expr)

  def _elementwise(self, func, name=None, other_args=(), inplace=False):
    return _elementwise_function(func, name, inplace=inplace)(self, *other_args)


def name_and_func(method):
  if isinstance(method, str):
    return method, lambda df, *args, **kwargs: getattr(df, method)(*args, **
                                                                   kwargs)
  else:
    return method.__name__, method


def _elementwise_method(func, name=None, restrictions=None, inplace=False):
  if name is None:
    name, func = name_and_func(func)
  if restrictions is None:
    restrictions = {}
  return _elementwise_function(func, name, restrictions)


def _elementwise_function(func, name=None, restrictions=None, inplace=False):
  if name is None:
    name = func.__name__
  if restrictions is None:
    restrictions = {}

  def wrapper(*args, **kwargs):
    for key, values in restrictions.items():
      if key in kwargs:
        value = kwargs[key]
      else:
        try:
          # pylint: disable=deprecated-method
          ix = inspect.getargspec(func).args.index(key)
        except ValueError:
          # TODO: fix for delegation?
          continue
        if len(args) <= ix:
          continue
        value = args[ix]
      if not isinstance(values, list):
        values = [values]
      if value not in values:
        raise NotImplementedError(
            '%s=%s not supported for %s' % (key, value, name))
    deferred_arg_indices = []
    deferred_arg_exprs = []
    constant_args = [None] * len(args)
    for ix, arg in enumerate(args):
      if isinstance(arg, DeferredFrame):
        deferred_arg_indices.append(ix)
        deferred_arg_exprs.append(arg._expr)
      elif isinstance(arg, pd.core.generic.NDFrame):
        deferred_arg_indices.append(ix)
        deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0]))
      else:
        constant_args[ix] = arg

    if inplace:
      actual_func = copy_and_mutate(func)
    else:
      actual_func = func

    def apply(*actual_args):
      full_args = list(constant_args)
      for ix, arg in zip(deferred_arg_indices, actual_args):
        full_args[ix] = arg
      return actual_func(*full_args, **kwargs)

    result_expr = expressions.elementwise_expression(
        name, apply, deferred_arg_exprs)
    if inplace:
      args[0]._expr = result_expr
      return args[0]

    else:
      return DeferredFrame.wrap(result_expr)

  return wrapper


def copy_and_mutate(func):
  def wrapper(self, *args, **kwargs):
    copy = self.copy()
    func(copy, *args, **kwargs)
    return copy

  return wrapper
